[kernel-spark] Create v2 adapters for metadata and protocol#6546
Conversation
ae2f693 to
f380f30
Compare
Range-diff: master (ae2f693 -> f380f30)
... (truncated, output exceeded 60000 bytes) Reproduce locally: |
f380f30 to
7713d81
Compare
Range-diff: master (f380f30 -> 7713d81)
... (truncated, output exceeded 60000 bytes) Reproduce locally: |
7713d81 to
08b4adc
Compare
Range-diff: master (7713d81 -> 08b4adc)
... (truncated, output exceeded 60000 bytes) Reproduce locally: |
08b4adc to
27798f7
Compare
Range-diff: master (08b4adc -> 27798f7)
... (truncated, output exceeded 60000 bytes) Reproduce locally: |
27798f7 to
6205321
Compare
Range-diff: master (27798f7 -> 6205321)
Reproduce locally: |
6205321 to
4e8ae79
Compare
Range-diff: master (6205321 -> 4e8ae79)
Reproduce locally: |
4e8ae79 to
d1be3cc
Compare
d1be3cc to
ca71e8f
Compare
Range-diff: master (d1be3cc -> ca71e8f)
Reproduce locally: |
ca71e8f to
71534ed
Compare
Range-diff: master (ca71e8f -> 71534ed)
Reproduce locally: |
71534ed to
19b49ba
Compare
Range-diff: master (71534ed -> 19b49ba)
... (truncated, output exceeded 60000 bytes) Reproduce locally: |
c803c1a to
a50c9d2
Compare
Range-diff: master (c803c1a -> a50c9d2)
... (truncated, output exceeded 60000 bytes) Reproduce locally: |
a50c9d2 to
dfe61d9
Compare
Range-diff: master (a50c9d2 -> dfe61d9)
... (truncated, output exceeded 60000 bytes) Reproduce locally: |
| @Override | ||
| public StructType partitionSchema() { | ||
| if (cachedPartitionSchema == null) { | ||
| cachedPartitionSchema = AbstractMetadata.super.partitionSchema(); | ||
| } | ||
| return cachedPartitionSchema; | ||
| } | ||
| } |
There was a problem hiding this comment.
can we add a test where partitionColumns = ["Part1"] and the schema field is part1? Note the difference in capitalization.
| public DeltaColumnMappingMode columnMappingMode() { | ||
| ColumnMapping.ColumnMappingMode kernelMode = | ||
| ColumnMapping.getColumnMappingMode(kernelMetadata.getConfiguration()); | ||
| switch (kernelMode) { | ||
| case NONE: | ||
| return NoMapping$.MODULE$; | ||
| case ID: | ||
| return IdMapping$.MODULE$; | ||
| case NAME: | ||
| return NameMapping$.MODULE$; | ||
| default: | ||
| throw new UnsupportedOperationException("Unsupported column mapping mode: " + kernelMode); | ||
| } | ||
| } |
There was a problem hiding this comment.
V1 already has DeltaColumnMappingMode.apply(String) that does this exact mapping. can we reuse it instead of maintaining two tables?
There was a problem hiding this comment.
Thank you for the info, reused the v1 apply
| cachedPartitionColumns = | ||
| CollectionConverters.asScala( | ||
| VectorUtils.toJavaList(kernelMetadata.getPartitionColumns()).stream() | ||
| .map(Object::toString) | ||
| .collect(Collectors.toList())) | ||
| .toSeq(); |
There was a problem hiding this comment.
toJavaList already returns List here. The .stream().map(Object::toString) does nothing and just hides type bugs.
| cachedPartitionColumns = | |
| CollectionConverters.asScala( | |
| VectorUtils.toJavaList(kernelMetadata.getPartitionColumns()).stream() | |
| .map(Object::toString) | |
| .collect(Collectors.toList())) | |
| .toSeq(); | |
| List<String> rawCols = VectorUtils.toJavaList(kernelMetadata.getPartitionColumns()); | |
| cachedPartitionColumns = CollectionConverters.asScala(rawCols).toSeq(); |
There was a problem hiding this comment.
I was using stream to cast type from Object to String, but you code looks better
| public Option<Set<String>> readerFeatures() { | ||
| if (cachedReaderFeatures == null) { | ||
| cachedReaderFeatures = | ||
| kernelProtocol.supportsReaderFeatures() | ||
| ? Option.apply( | ||
| CollectionConverters.asScala(kernelProtocol.getReaderFeatures()).toSet()) | ||
| : Option.empty(); | ||
| } | ||
| return cachedReaderFeatures; | ||
| } |
There was a problem hiding this comment.
volatile + check-then-set seems racy (two threads can both compute)
There was a problem hiding this comment.
kernelProtocol is declared to be a constant, so it is fine
| case NONE: | ||
| return NoMapping$.MODULE$; | ||
| case ID: | ||
| return IdMapping$.MODULE$; |
There was a problem hiding this comment.
Only NoMapping and NameMapping are tested -- IdMapping is uncovered. same with unknown mode.
dfe61d9 to
f3359c7
Compare
Range-diff: master (dfe61d9 -> f3359c7)
... (truncated, output exceeded 60000 bytes) Reproduce locally: |
TimothyW553
left a comment
There was a problem hiding this comment.
Hi @PorridgeSwim approved with one comment. Please confirm, update the branch, and ping me to merge when CI is green.
| public void testProtocolAdapterWithTableFeatures() { | ||
| // Reader features: supported but empty (version >= 3 means features are supported, even with | ||
| // an empty set). Writer features: supported and populated. | ||
| Set<String> readerFeatures = Collections.emptySet(); |
There was a problem hiding this comment.
Please include v2Checkpoint here too, because it is a reader-writer feature and this test should use a valid protocol.
| Set<String> readerFeatures = Collections.emptySet(); | |
| Set<String> readerFeatures = Collections.singleton("v2Checkpoint"); |
f3359c7 to
7bbf917
Compare
Range-diff: master (f3359c7 -> 7bbf917)
... (truncated, output exceeded 60000 bytes) Reproduce locally: |
| * Adapter from {@link io.delta.kernel.internal.actions.Metadata} to {@link | ||
| * org.apache.spark.sql.delta.v2.interop.AbstractMetadata}. | ||
| */ | ||
| public class KernelMetadataAdapter implements AbstractMetadata { |
There was a problem hiding this comment.
why is there both interop and adapters directories?
whats the difference?
There was a problem hiding this comment.
interop is under org.apache.spark.sql define the AbstractMetadata interface that v1 MetadataAction extends, while adapters is under io.delta.spark.internal.v2 that adapts v2 MetadataAction into AbstractMetadata to bridge between v1 and v2 MetadataAction.
| @Override | ||
| public DeltaColumnMappingMode columnMappingMode() { | ||
| String mode = kernelMetadata.getConfiguration().get(ColumnMapping.COLUMN_MAPPING_MODE_KEY); | ||
| return mode == null ? NoMapping$.MODULE$ : DeltaColumnMappingMode$.MODULE$.apply(mode); |
There was a problem hiding this comment.
should we cache columnMappingMode for faster access?
7bbf917 to
6070b93
Compare
6070b93 to
84f5ce6
Compare
Co-authored-by: Isaac
97651ca to
9271a62
Compare
…#6550) ## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/6550/files) to review incremental changes. - [stack/SparkMetadataAdapter](#6546) [[Files changed](https://github.com/delta-io/delta/pull/6546/files)] [MERGED] - [**stack/RefactorMetadataTrackingLog**](#6550) [[Files changed](https://github.com/delta-io/delta/pull/6550/files)] - [stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562) [[Files changed](https://github.com/delta-io/delta/pull/6562/files/953f137f8c4ce46d8b8a9605b0c7bed898e30df4..027984b6edcbad0f4731e560425c2ed9bcf8fc27)] - [stack/MetadataEvolutionHandler2](#6563) [[Files changed](https://github.com/delta-io/delta/pull/6563/files/027984b6edcbad0f4731e560425c2ed9bcf8fc27..ada845895139edcb2727a87b39922c8e16837a99)] - [stack/NonAdditiveSchemaEvolution2](#6570) [[Files changed](https://github.com/delta-io/delta/pull/6570/files/ada845895139edcb2727a87b39922c8e16837a99..476762fde7b9cb9b9bc3e416c86a260cd29806ed)] - [stack/NonAdditiveSchemaEvolution3](#6697) [[Files changed](https://github.com/delta-io/delta/pull/6697/files/476762fde7b9cb9b9bc3e416c86a260cd29806ed..13395a7f2a49db4962091e8ee919bebdab5bd4e2)] - [stack/consecutiveSchemaChangesMerger](#6698) [[Files changed](https://github.com/delta-io/delta/pull/6698/files/13395a7f2a49db4962091e8ee919bebdab5bd4e2..f22ba063eaf35ab69d653a2d5faefdc52f35eab5)] --------- #### Which Delta project/connector is this regarding? - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description PR 2/7 in the non-additive schema evolution for V2 streaming connector stack. Decouple `DeltaSourceMetadataTrackingLog` and `PersistedMetadata` from V1-specific types so the schema log can be reused by the V2 connector. - Replace `SnapshotDescriptor` parameter in `create()` with plain `sourceTableId` and `sourceDataPath` strings - Unify `PersistedMetadata.apply` to accept `AbstractMetadata`/`AbstractProtocol` instead of V1 `Metadata`/`Protocol` - Extract the consecutive schema changes merger (V1-specific, depends on `DeltaLog`) out of the companion object into `DeltaSourceMetadataEvolutionSupport`, and inject it as a function parameter so V2 can provide its own implementation - Remove `Protocol`'s `private` constructor modifier to allow construction from abstract protocol fields All changes are structural refactors with no behavioral change. ## How was this patch tested? Existing tests in `DeltaSourceSchemaEvolutionSuite` updated to use the new API. No behavioral changes. ## Does this PR introduce _any_ user-facing changes? No.
…seable in v2 (#6562) ## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/6562/files) to review incremental changes. - [stack/SparkMetadataAdapter](#6546) [[Files changed](https://github.com/delta-io/delta/pull/6546/files)] [MERGED] - [stack/RefactorMetadataTrackingLog](#6550) [[Files changed](https://github.com/delta-io/delta/pull/6550/files)] [MERGED] - [**stack/RefactorDeltaSourceMetadataEvolutionSupport**](#6562) [[Files changed](https://github.com/delta-io/delta/pull/6562/files)] - [stack/MetadataEvolutionHandler2](#6563) [[Files changed](https://github.com/delta-io/delta/pull/6563/files/ed92a0fa2051432b6bc5784034df0b7949bbfb98..e5b2c3295843ec85753e07dc0010aa5ccebaabb7)] - [stack/NonAdditiveSchemaEvolution2](#6570) [[Files changed](https://github.com/delta-io/delta/pull/6570/files/e5b2c3295843ec85753e07dc0010aa5ccebaabb7..7c66bf11a0f1b651cda32ed7f529f552dd9dbfcb)] - [stack/NonAdditiveSchemaEvolution3](#6697) [[Files changed](https://github.com/delta-io/delta/pull/6697/files/7c66bf11a0f1b651cda32ed7f529f552dd9dbfcb..14956ea304c93d2343ccd7eb89a112966f07f906)] - [stack/consecutiveSchemaChangesMerger](#6698) [[Files changed](https://github.com/delta-io/delta/pull/6698/files/14956ea304c93d2343ccd7eb89a112966f07f906..8101b335b892a6a5b6d6fe11f4a202d14102721c)] --------- #### Which Delta project/connector is this regarding? - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description PR 3/7 in the non-additive schema evolution for V2 streaming connector stack. Refactor `DeltaSourceMetadataEvolutionSupport` and `DeltaColumnMapping` so the schema change detection logic can be called from V2 without depending on V1 instance state. **`DeltaSourceMetadataEvolutionSupport`:** - Extract instance methods (`validateAndResolveMetadataEvolution`, `checkColumnMappingSchemaChangesDuringStreaming`, `resolveMetadataEvolutionForCommitRange`, etc.) to companion object statics that accept explicit parameters instead of accessing V1 `DeltaSource` via `this` - V1 trait methods now delegate to the companion object statics **`DeltaColumnMapping`:** - Widen `hasNoColumnMappingSchemaChanges` from V1 `Metadata` to `AbstractMetadata` so V2 can call it via the adapter layer - Extract `assignColumnIdAndPhysicalNameToSchema(StructType, Map)` from `assignColumnIdAndPhysicalName(Metadata, Metadata, ...)` — needed for simulating column mapping upgrades during NoMapping-to-NameMapping transitions All changes are structural refactors with no behavioral change. ## How was this patch tested? Existing tests in `DeltaSourceSchemaEvolutionSuite` continue to pass. No behavioral changes. ## Does this PR introduce _any_ user-facing changes? No.
## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/6563/files) to review incremental changes. - [stack/SparkMetadataAdapter](#6546) [[Files changed](https://github.com/delta-io/delta/pull/6546/files)] [MERGED] - [stack/RefactorMetadataTrackingLog](#6550) [[Files changed](https://github.com/delta-io/delta/pull/6550/files)] [MERGED] - [stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562) [[Files changed](https://github.com/delta-io/delta/pull/6562/files)] [MERGED] - [**stack/MetadataEvolutionHandler2**](#6563) [[Files changed](https://github.com/delta-io/delta/pull/6563/files)] - [stack/NonAdditiveSchemaEvolution2](#6570) [[Files changed](https://github.com/delta-io/delta/pull/6570/files/a20f1f3ab452a75fc954e15c57c17327e0cb9267..0e07f87285becd6be416450ae084df454d9c94a9)] - [stack/NonAdditiveSchemaEvolution3](#6697) [[Files changed](https://github.com/delta-io/delta/pull/6697/files/0e07f87285becd6be416450ae084df454d9c94a9..73e1aa7f4162a3e1480ffd2b88b9ca79d852f2fe)] - [stack/consecutiveSchemaChangesMerger](#6698) [[Files changed](https://github.com/delta-io/delta/pull/6698/files/73e1aa7f4162a3e1480ffd2b88b9ca79d852f2fe..5e5d260b64d45cc11bcfdb58e5aab1b2d2637b33)] - [stack/V1V2MixTest](#6759) [[Files changed](https://github.com/delta-io/delta/pull/6759/files/5e5d260b64d45cc11bcfdb58e5aab1b2d2637b33..738379713040986c74f98dbebfdc6c83ec1d3f16)] --------- #### Which Delta project/connector is this regarding? - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description PR 4/7 in the non-additive schema evolution for V2 streaming connector stack. Introduce `MetadataEvolutionHandler`, a Java class that implements the V1 barrier protocol for schema evolution in the V2 connector. In V1 this logic lives in `DeltaSourceMetadataEvolutionSupport`, a Scala trait mixed into `DeltaSource` that accesses stream state via `this`. Since V2's `SparkMicroBatchStream` is Java and cannot use Scala trait mixins, `MetadataEvolutionHandler` receives all dependencies via constructor injection instead. The handler covers the full schema evolution lifecycle: - **Stream start**: eager metadata tracking log initialization on first batch - **Offset generation**: injects `METADATA_CHANGE_INDEX` / `POST_METADATA_CHANGE_INDEX` barrier sentinels into the file change iterator - **Pending schema offsets**: returns barrier offsets for in-progress schema changes - **Batch commit**: updates the schema log and throws `DELTA_STREAMING_METADATA_EVOLUTION` to trigger stream restart - **Batch planning on restart**: validates and re-initializes the schema log All detection logic delegates to the shared `DeltaSourceMetadataEvolutionSupport$` companion object statics (refactored in PR 3/7). V2-specific orchestration is limited to wiring the barrier protocol into the `CloseableIterator<IndexedFile>` pipeline and collecting metadata/protocol from Kernel commit ranges via `StreamingHelper`. Also extends `StreamingHelper` with `getMetadataAndProtocolForVersionRange` to collect metadata and protocol actions from a range of Kernel commits. ## How was this patch tested? Unit tests in `MetadataEvolutionHandlerTest.java` covering: barrier protocol (METADATA_CHANGE_INDEX / POST_METADATA_CHANGE_INDEX offset generation), tracking state transitions, initialization lifecycle, offset arithmetic, pending schema change handling, and commit-time evolution exception. ## Does this PR introduce _any_ user-facing changes? No.
## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/6570/files) to review incremental changes. - [stack/SparkMetadataAdapter](#6546) [[Files changed](https://github.com/delta-io/delta/pull/6546/files)] [MERGED] - [stack/RefactorMetadataTrackingLog](#6550) [[Files changed](https://github.com/delta-io/delta/pull/6550/files)] [MERGED] - [stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562) [[Files changed](https://github.com/delta-io/delta/pull/6562/files)] [MERGED] - [stack/MetadataEvolutionHandler2](#6563) [[Files changed](https://github.com/delta-io/delta/pull/6563/files)] [MERGED] - [**stack/NonAdditiveSchemaEvolution2**](#6570) [[Files changed](https://github.com/delta-io/delta/pull/6570/files)] - [stack/NonAdditiveSchemaEvolution3](#6697) [[Files changed](https://github.com/delta-io/delta/pull/6697/files/b7f6c8ebfc0882e7e2cc580f09f376be23a8d43d..dbb6246c14be1ab7f017ad9fc26455ae599ee676)] - [stack/consecutiveSchemaChangesMerger](#6698) [[Files changed](https://github.com/delta-io/delta/pull/6698/files/dbb6246c14be1ab7f017ad9fc26455ae599ee676..4bf2fa3fa828bcab0b56c4c26ca51ee9cc40b482)] - [stack/SchemaTrackingWithCDC](#6801) [[Files changed](https://github.com/delta-io/delta/pull/6801/files/4bf2fa3fa828bcab0b56c4c26ca51ee9cc40b482..a78a4ac2bc9a52605278a36b98804230258c12a2)] - [stack/V1V2MixTest](#6759) [[Files changed](https://github.com/delta-io/delta/pull/6759/files/7f9b7f2724b2245ab7380908616303cf7ea95fca..e146cdc9ebb0572e8b0a928cc6dd3bfdc198d984)] --------- #### Which Delta project/connector is this regarding? - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description PR 5/7 in the non-additive schema evolution for V2 streaming connector stack. Wire schema tracking into V2's analysis path so the analyzed plan reflects the persisted (evolved) schema instead of the live snapshot schema. - `DeltaAnalysis.verifyDeltaSourceSchemaLocation`: extend the duplicate-schema-location check to also visit `StreamingRelationV2`, keyed on the V2 `Table.name`. - `SparkTable`: open `DeltaSourceMetadataTrackingLog` once during construction (gated on `mergeConsecutiveSchemaChanges`) and seed `SchemaProvider` from the persisted metadata, so analysis-time `schema()` matches what the stream will read at runtime. - `ApplyV2ReadOptions` (renamed from `ApplyV2Streaming`): generalize the CDC-only rebuild to also fire when `schemaTrackingLocation` arrives via `extraOptions` on the catalog `readStream.table()` path; rebuild `SparkTable` with merged options so the schema-log lookup actually fires. - `MetadataEvolutionHandler.getMetadataTrackingLogForMicroBatchStream`: V2 port of V1's helper, reused by `SparkTable` (analysis) and `SparkScan` (execution). ## How was this patch tested? `SparkTableTest`, `MetadataEvolutionHandlerTest`, `ApplyV2ReadOptionsSuite`. Unified `DeltaV2SourceSchemaEvolutionSuite` updated. ## Does this PR introduce _any_ user-facing changes? No.
…6697) ## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/6697/files) to review incremental changes. - [stack/SparkMetadataAdapter](#6546) [[Files changed](https://github.com/delta-io/delta/pull/6546/files)] [MERGED] - [stack/RefactorMetadataTrackingLog](#6550) [[Files changed](https://github.com/delta-io/delta/pull/6550/files)] [MERGED] - [stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562) [[Files changed](https://github.com/delta-io/delta/pull/6562/files)] [MERGED] - [stack/MetadataEvolutionHandler2](#6563) [[Files changed](https://github.com/delta-io/delta/pull/6563/files)] [MERGED] - [stack/NonAdditiveSchemaEvolution2](#6570) [[Files changed](https://github.com/delta-io/delta/pull/6570/files)] [MERGED] - [**stack/NonAdditiveSchemaEvolution3**](#6697) [[Files changed](https://github.com/delta-io/delta/pull/6697/files)] - [stack/consecutiveSchemaChangesMerger](#6698) [[Files changed](https://github.com/delta-io/delta/pull/6698/files/f96643aa3cc01e7f70cc13a18b82dc27f277f11d..f612628ad931ec35c237801109f01b6fbd1379f7)] - [stack/SchemaTrackingWithCDC](#6801) [[Files changed](https://github.com/delta-io/delta/pull/6801/files/f612628ad931ec35c237801109f01b6fbd1379f7..4aeacfb120b33e9cdfe124352290b72f53f7cf89)] - [stack/V1V2MixTest](#6759) [[Files changed](https://github.com/delta-io/delta/pull/6759/files/f612628ad931ec35c237801109f01b6fbd1379f7..0c818ee431ab417a4f2ffbcc609930be09d25031)] --------- #### Which Delta project/connector is this regarding? - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description PR 6/7 in the non-additive schema evolution for V2 streaming connector stack. Wire `MetadataEvolutionHandler` into `SparkMicroBatchStream` and `SparkScan` so V2 streaming reads honor non-additive schema evolution (column rename/drop, type widening). - `SparkMicroBatchStream`: take `metadataTrackingLog` + `metadataPath` as constructor inputs; when a persisted entry exists, layer it onto the freshly loaded `snapshotAtSourceInit` to derive `readSnapshotAtSourceInit` (mirrors V1's `readSnapshotDescriptor`). Integrate the schema-evolution barrier protocol into `latestOffset` / `commit` / `planInputPartitions`. Skip the on-restart schema-validation check when schema tracking is active — the schema-log evolution exception covers it. - `SparkScan.toMicroBatchStream`: reload latest snapshot (the analysis-time `initialSnapshot` can be stale by stream start), open the tracking log via `MetadataEvolutionHandler.getMetadataTrackingLogForMicroBatchStream` with `mergeConsecutiveSchemaChanges=false` (the merger only runs at analysis), and pass it through with the checkpoint location. - `SparkScan` option allow-list: move `allowSourceColumnDrop` / `Rename` / `TypeChange` out of the unsupported list now that they are honored. ## How was this patch tested? `SparkMicroBatchStreamTest`, `MetadataEvolutionHandlerTest`. Unified suites (`DeltaV2SourceSchemaEvolutionSuite`, `TypeWideningStreamingV2SourceSuite`, `RemoveColumnMappingStreamingReadV2Suite`) move non-merger evolution scenarios from `shouldFailTests` to `shouldPassTests`; merger-dependent tests remain pending until PR 7/7. ## Does this PR introduce _any_ user-facing changes? No.
## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/6698/files) to review incremental changes. - [stack/SparkMetadataAdapter](#6546) [[Files changed](https://github.com/delta-io/delta/pull/6546/files)] [MERGED] - [stack/RefactorMetadataTrackingLog](#6550) [[Files changed](https://github.com/delta-io/delta/pull/6550/files)] [MERGED] - [stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562) [[Files changed](https://github.com/delta-io/delta/pull/6562/files)] [MERGED] - [stack/MetadataEvolutionHandler2](#6563) [[Files changed](https://github.com/delta-io/delta/pull/6563/files)] [MERGED] - [stack/NonAdditiveSchemaEvolution2](#6570) [[Files changed](https://github.com/delta-io/delta/pull/6570/files)] [MERGED] - [stack/NonAdditiveSchemaEvolution3](#6697) [[Files changed](https://github.com/delta-io/delta/pull/6697/files)] [MERGED] - [**stack/consecutiveSchemaChangesMerger**](#6698) [[Files changed](https://github.com/delta-io/delta/pull/6698/files)] - [stack/SchemaTrackingWithCDC](#6801) [[Files changed](https://github.com/delta-io/delta/pull/6801/files/e230b46c3acb772d6599662b7c5aaf17e3625498..1ed4903f1b06fd49533dad3a1cf25c9206aef2f3)] - [stack/V1V2MixTest](#6759) [[Files changed](https://github.com/delta-io/delta/pull/6759/files/e230b46c3acb772d6599662b7c5aaf17e3625498..e3c0d530a63150797b7b882fab2dad2070452683)] --------- #### Which Delta project/connector is this regarding? - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description PR 7/7 in the non-additive schema evolution for V2 streaming connector stack. Implement V2's consecutive-schema-changes merger so analysis-time evolution matches V1: runs of consecutive metadata-only commits collapse to a single tracked entry at the latest version. Without the merger, each metadata-only commit produces its own pending schema offset. - `MetadataEvolutionHandler.getMergedConsecutiveMetadataChanges`: V2 port of V1's `DeltaSourceMetadataEvolutionSupport.getMergedConsecutiveMetadataChanges`. Walks Kernel commits forward via `CommitRangeImpl` + `StreamingHelper.getCommitActionsFromRangeUnsafe`; for each commit detects file actions (ADD/REMOVE) and metadata/protocol actions; stops on the first commit with a file action or with neither metadata nor protocol; emits a merged `PersistedMetadata` at the latest metadata-only version. - `MetadataEvolutionHandler.getMetadataTrackingLogForMicroBatchStream`: pass the merger lambda into `DeltaSourceMetadataTrackingLog.create` (was a `null` placeholder). - `DeltaSourceMetadataTrackingLog` (V1): extract `PersistedMetadata.toProtocolJson` helper so V2's merger can reuse the same protocol-JSON encoding. ## How was this patch tested? `MetadataEvolutionHandlerTest` covers merger walk semantics — stop-on-file-action, stop-on-no-metadata-or-protocol, multiple folded changes, protocol-only and combined updates. `DeltaSourceSchemaEvolutionSuite` adds parallel V1 tests for the same scenarios. Unified `DeltaV2SourceSchemaEvolutionSuite` moves the remaining merger-dependent scenarios (`consecutive schema evolutions`, `unblock with sql conf`, `streaming with a column mapping upgrade`) from `shouldFailTests` to `shouldPassTests`. ## Does this PR introduce _any_ user-facing changes? No.
## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/6801/files) to review incremental changes. - [stack/SparkMetadataAdapter](#6546) [[Files changed](https://github.com/delta-io/delta/pull/6546/files)] [MERGED] - [stack/RefactorMetadataTrackingLog](#6550) [[Files changed](https://github.com/delta-io/delta/pull/6550/files)] [MERGED] - [stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562) [[Files changed](https://github.com/delta-io/delta/pull/6562/files)] [MERGED] - [stack/MetadataEvolutionHandler2](#6563) [[Files changed](https://github.com/delta-io/delta/pull/6563/files)] [MERGED] - [stack/NonAdditiveSchemaEvolution2](#6570) [[Files changed](https://github.com/delta-io/delta/pull/6570/files)] [MERGED] - [stack/NonAdditiveSchemaEvolution3](#6697) [[Files changed](https://github.com/delta-io/delta/pull/6697/files)] [MERGED] - [stack/consecutiveSchemaChangesMerger](#6698) [[Files changed](https://github.com/delta-io/delta/pull/6698/files)] [MERGED] - [**stack/SchemaTrackingWithCDC**](#6801) [[Files changed](https://github.com/delta-io/delta/pull/6801/files)] - [stack/V1V2MixTest](#6759) [[Files changed](https://github.com/delta-io/delta/pull/6759/files)] --------- #### Which Delta project/connector is this regarding? - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Follow-up to the non-additive schema evolution stack: extend V2 streaming schema tracking to CDC reads so a CDC stream stops at metadata- or protocol-change commits with a barrier sentinel instead of either silently reading across the change or failing the read-compat check. - `SparkMicroBatchStream.collectAndBuildCDCIndexedFiles`: capture `Protocol` alongside `Metadata` while scanning a commit's actions, then call `MetadataEvolutionHandler.getMetadataOrProtocolChangeIndexedFileIterator` once the scan is done; when the commit diverges from source-init, return a singleton barrier (`METADATA_CHANGE_INDEX`) in place of BASE + files + END. Skip the on-commit `verifyMetadataAction` read-compat check when schema tracking is active — the barrier covers divergence. V1 splits this between `DeltaSourceCDCSupport.filterAndIndexDeltaLogs` (barrier injection) and `IndexedChangeFileSeq.filterFiles` (short-circuit); V2 collapses both into this single method. - `SparkMicroBatchStream.applyPerCommitCDCAdmission`: pass barrier sentinels through admission unchanged (they can only appear as element 0 of the per-commit list). - `SparkMicroBatchStream.getFileChangesForCDC`: apply `metadataEvolutionHandler.stopIndexedFileIteratorAtSchemaChangeBarrier` after end-boundary filtering so post-barrier commits in the same batch are truncated. V1's wrap lives in the shared `DeltaSource.getFileChangesWithRateLimit`; V2 places it inside the CDC-specific method because both `planInputPartitions` and the outer `getFileChangesWithRateLimit` reach the CDC iterator through here. - `MetadataEvolutionHandler.getMergedConsecutiveMetadataChanges`: include `AddCDCFile` in the action set the merger walks and treat any non-null CDC column as a file action that stops the merger walk. Drops the `mergeActionSet` parameter (always `CDC_ACTION_SET` now) so non-CDC and CDC analysis share the same stop semantics. Resolves the TODO(#5319) placeholder left by PR 7/7. - `SparkMicroBatchStream.CDC_ACTION_SET`: promote to `public` so `MetadataEvolutionHandler` can reuse it. ## How was this patch tested? `SparkMicroBatchStreamCDCTest` adds barrier-emission cases: - `testProcessCommit_emitsBarrierAtSchemaChange`: a metadata-only commit on a CDC stream with seeded tracking emits `[barrier, END]` from `processCommitToIndexedFilesForCDC`. - `testGetFileChangesForCDC_emitsBarrierAtSchemaChange`: end-to-end check that the barrier fires and the iterator truncates across commits — exercises metadata/protocol capture, barrier emission, admission passthrough, and cross-commit truncation in one path. `MetadataEvolutionHandlerTest` extends the merger walk tests to cover CDC file actions stopping the walk. The unified `DeltaV2SourceSchemaEvolutionCDCSuiteBase` moves all evolution scenarios out of `shouldFailTests` into `shouldPassTests` so the CDC variants of the streaming schema-evolution suite now run alongside non-CDC. ## Does this PR introduce _any_ user-facing changes? No. Co-authored-by: Timothy Wang <timothy.art@gmail.com>
🥞 Stacked PR
Use this link to review incremental changes.
Which Delta project/connector is this regarding?
Description
PR 1/7 in the non-additive schema evolution for V2 streaming connector stack.
The shared V1 Scala utilities (
DeltaColumnMapping,DeltaSourceMetadataEvolutionSupport) operate onAbstractMetadata/AbstractProtocol, but V2 holds Kernel types. This PR creates two adapter classes that bridge the gap:KernelMetadataAdapter: KernelMetadata→AbstractMetadata(schema conversion viaSchemaUtils, partition columns and configuration converted to Scala collections)KernelProtocolAdapter: KernelProtocol→AbstractProtocol(maps reader/writer features toOption[Set[String]])Also adds
columnMappingModeandpartitionSchemato theAbstractMetadatatrait — V1'sMetadataalready had these fields, the trait just didn't expose them.How was this patch tested?
Unit tests in
ActionAdaptersTest.java: table-features protocol, legacy protocol, full metadata round-trip, null optional fields, and null constructor rejection.Does this PR introduce any user-facing changes?
No.